#include "config.h"

#include <sys/mman.h>
#include <libaio.h>
#include <errno.h>
#include <ctype.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "replica.h"
#include "job_dock.h"
#include "sysy_lib.h"
#include "clock_mem.h"
#include "clock.h"
#include "squeue.h"
#include "disk.h"
#include "net_global.h"
#include "dbg.h"

typedef clock_entry_t entry_t;

static int __clock_rdlock(clock_mem_t *md)
{
        if (likely(md->priv)) {
                return plock_rdlock(&md->u.plock);
        } else {
                return sy_rwlock_rdlock(&md->u.rwlock);
        }
}

static int __clock_wrlock(clock_mem_t *md)
{
        if (likely(md->priv)) {
                return plock_wrlock(&md->u.plock);
        } else {
                return sy_rwlock_wrlock(&md->u.rwlock);
        }
}

static void __clock_unlock(clock_mem_t *md)
{
        if (likely(md->priv)) {
                plock_unlock(&md->u.plock);
        } else {
                sy_rwlock_unlock(&md->u.rwlock);
        }
}

static uint32_t __key(const void *args)
{
        const chkid_t *id = args;

        return id->id * (1 + id->idx);
}

static int __cmp(const void *v1, const void *v2)
{
        const entry_t *ent = (entry_t *)v1;

        return chkid_cmp(&ent->id, (const chkid_t *)v2);
}

int clock_mem_init(clock_mem_t **_md, const char *name, int private)
{
        int ret;
        clock_mem_t *md;

        if (gloconf.clock_unsafe) {
                DERROR("clock unsafe on\n");
        }

        ret = ymalloc((void **)&md, sizeof(*md));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(md, 0x0, sizeof(*md));

        DBUG("size %llu\n", (LLU)sizeof(*md));

        if (private) {
                ret = plock_init(&md->u.plock, "clock_mem");
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = sy_rwlock_init(&md->u.rwlock, "clock_mem");
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        md->tab = hash_create_table(__cmp, __key, name);
        if (md->tab == NULL) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        md->priv = private;
        YASSERT(clock_table_size);
        DINFO("init clock table %s size %u\n", name, clock_table_size);
        hashtable_resize(md->tab, clock_table_size);

        *_md = md;

        return 0;
err_ret:
        return ret;
}

static int __clock_mem_add(clock_mem_t *md, const chkid_t *chkid)
{
        int ret;
        entry_t *ent;

        ret = __clock_wrlock(md);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = hash_table_find(md->tab, chkid);
        if (ent == NULL) {
                ret = ymalloc1((void **)&ent, sizeof(*ent));
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                memset(ent, 0x0, sizeof(*ent));
                ent->id = *chkid;

                ret = sy_spin_init(&ent->lock);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                ret = hash_table_insert(md->tab, (void *)ent, &ent->id, 0);
                YASSERT(ret == 0);
        }

        __clock_unlock(md);

        return 0;
err_lock:
        __clock_unlock(md);
err_ret:
        return ret;
}

int clock_mem_vclock_set(clock_mem_t *md, const chkid_t *id, const vclock_t *vclock, int dirty)
{
        int ret, retry = 0;
        entry_t *ent;

retry:
        ret = __clock_rdlock(md);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = hash_table_find(md->tab, id);
        if (ent == NULL) {
                __clock_unlock(md);

                retry++;
                YASSERT(retry < 10);
                __clock_mem_add(md, id);

                goto retry;
        }

        //YASSERT(ent->version <= version);
        if (likely(md->priv)) {
                ent->vclock = *vclock;
                ent->dirty = dirty;
        } else {
                ret = sy_spin_lock(&ent->lock);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        
                ent->vclock = *vclock;
                ent->dirty = dirty;

                sy_spin_unlock(&ent->lock);
        }

        //DBUG("set chunk "CHKID_FORMAT" version %llu master %s\n", CHKID_ARG(id),
        //(LLU)vclock->version, network_rname(&vclock->nid.id));

        __clock_unlock(md);

        return 0;
err_lock:
        __clock_unlock(md);
err_ret:
        return ret;
}

int clock_mem_vclock_get(clock_mem_t *md, const chkid_t *id, vclock_t *vclock, int *dirty)
{
        int ret;
        entry_t *ent;

        ret = __clock_rdlock(md);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = hash_table_find(md->tab, id);
        if (ent == NULL) {
#if 0
                if (gloconf.clock_unsafe && id->type == __RAW_CHUNK__) {
                        vclock->clock = 0;
                        vclock->vfm = 0;
                        *dirty = 0;
                        goto out;
                } else {
                        DBUG("get chunk "CHKID_FORMAT" not found\n", CHKID_ARG(id));
                }
#endif

                ret = ENOENT;
                goto err_lock;
        }

        *vclock = ent->vclock;
        *dirty = ent->dirty;

        DBUG("get chunk "CHKID_FORMAT" clock %llu\n", CHKID_ARG(id), (LLU)vclock->clock);

//out:
        __clock_unlock(md);

        return 0;
err_lock:
        __clock_unlock(md);
err_ret:
        return ret;
}

int clock_mem_remove(clock_mem_t *md, const chkid_t *chkid)
{
        int ret;

        ret = __clock_wrlock(md);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        hash_table_remove(md->tab, chkid, NULL);

        __clock_unlock(md);

        return 0;
err_ret:
        return ret;
}

int clock_mem_add(clock_mem_t *md, const chkid_t *id, const vclock_t *vclock)
{
        int ret;
        entry_t *ent;

        ret = __clock_wrlock(md);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = hash_table_find(md->tab, id);
        if (ent) {
                ret = EEXIST;
                GOTO(err_lock, ret);
        } else {
                ret = ymalloc1((void **)&ent, sizeof(*ent));
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                ent->id = *id;
                ent->vclock = *vclock;

                ret = sy_spin_init(&ent->lock);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
                
                ret = hash_table_insert(md->tab, (void *)ent, &ent->id, 0);
                YASSERT(ret == 0);
        }

        __clock_unlock(md);

        return 0;
err_lock:
        __clock_unlock(md);
err_ret:
        return ret;
}

int clock_mem_iterator(clock_mem_t *md,  int (*handler)(void *, void *), void *ctx)
{
        int ret;

        ret = __clock_rdlock(md);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = hash_iterate_table_entries(md->tab, handler, ctx);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __clock_unlock(md);

        return 0;
err_lock:
        __clock_unlock(md);
err_ret:
        return ret;
}

static void __clock_mem_destroy(void *_ent)
{
        entry_t *ent = _ent;

        yfree1((void **)&ent);
}

void clock_mem_destroy(clock_mem_t *md)
{
        int ret;

        ret = __clock_wrlock(md);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        hash_destroy_table(md->tab, __clock_mem_destroy);

        __clock_unlock(md);

        yfree1((void **)&md);
}
